package flinkstudy.table;

import flinkstudy.bo.EstateInfo;
import flinkstudy.table.environment.FlinkBatchQuery;
import flinkstudy.table.mock.EstateInfoCsvDataSource;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.junit.Before;
import org.junit.Test;

import static flinkstudy.table.mock.EstateInfoCsvDataSource.ESTATE_INFO_COLUMN;
import static flinkstudy.table.mock.EstateInfoCsvDataSource.T_ESTATE_INFO;
import static flinkstudy.table.mock.HouseInfoCsvDataSource.T_HOUSE_INFO;

/**
 * dsl 查询
 *
 * @author daocr
 * @date 2019/12/18
 */
public class TableApi {

    private FlinkBatchQuery flinkBatchQuery;

    @Before
    public void init() {

        flinkBatchQuery = new FlinkBatchQuery();
        ExecutionEnvironment flinkBatchEnv = flinkBatchQuery.getFlinkBatchEnv();
        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        //1、 获取数据源
        DataSource<EstateInfo> estateInfoDataSource = EstateInfoCsvDataSource.of(flinkBatchEnv);
        //2、把数据源转换为  table
        Table estateInfoTable = flinkBatchTableEnv.fromDataSet(estateInfoDataSource);
        // 3、把表注册到 flink
        flinkBatchTableEnv.registerTable(T_ESTATE_INFO, estateInfoTable);


        //1、 获取数据源
        DataSource<EstateInfo> houseInfoDataSource = EstateInfoCsvDataSource.of(flinkBatchEnv);
        //2、把数据源转换为  table
        Table houseInfo = flinkBatchTableEnv.fromDataSet(houseInfoDataSource);
        // 3、把表注册到 flink
        flinkBatchTableEnv.registerTable(T_HOUSE_INFO, houseInfo);
    }

    /**
     * 基础操作
     */
    @Test
    public void operations() throws Exception {

        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        Table where = flinkBatchTableEnv
                //选择表
                .scan(T_ESTATE_INFO)
                //选择列
                .select(ESTATE_INFO_COLUMN)
                //过滤数据
                .filter("houseId === 29916 && districtId === 5336")
//                .where("houseId === 29916")
                // 排序
                .orderBy("price.desc")
                // 类型与mysql 的 limit
                .fetch(10);

        DataSet<EstateInfo> estateInfoDataSet = flinkBatchTableEnv.toDataSet(where, EstateInfo.class);

        estateInfoDataSet.print();
    }

    /**
     * 聚合分析
     */
    @Test
    public void aggregations() throws Exception {
        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        Table plateId = flinkBatchTableEnv.scan(T_ESTATE_INFO)
                // 多个用 逗号 【,】 隔开
                .groupBy("plateId")
                // sql : count
                .select("plateId,plateId.count,price.avg");

        TypeInformation<Tuple3<Integer, Long, Long>> pojoTypeInfo = TypeInformation.of(new TypeHint<Tuple3<Integer, Long, Long>>() {
        });

        DataSet<Tuple3<Integer, Long, Long>> tuple2DataSet = flinkBatchTableEnv.toDataSet(plateId, pojoTypeInfo);

        tuple2DataSet.print();

    }

    /**
     * 链接查询
     *
     * @throws Exception
     */
    @Test
    public void joins() throws Exception {

        BatchTableEnvironment flinkBatchTableEnv = flinkBatchQuery.getFlinkBatchTableEnv();

        Table table = flinkBatchTableEnv.sqlQuery("select t_house_info.cityId,t_estate_info.price from t_house_info join t_estate_info on (t_house_info.houseId = t_estate_info.houseId) ");

        TypeInformation<Tuple2<Integer, Long>> pojoTypeInfo = TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {
        });

        DataSet<Tuple2<Integer, Long>> tuple2DataSet = flinkBatchTableEnv.toDataSet(table, pojoTypeInfo);

        tuple2DataSet.print();

    }


}
